package com.jfire.socket.socketserver.bus;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.annotation.Resource;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.jfire.socket.socketserver.handler.MessageAction;
import com.jfire.socket.socketserver.handler.MessageHandler;
import com.jfire.socket.socketserver.interceptor.MessageInterceptor;

@Resource
public class LocalHandlerCenter implements MessageHandlerCenter
{
    @Resource
    private List<MessageHandler>            handlerList     = new ArrayList<>();
    @Resource
    private List<MessageInterceptor>        interceptorList = new ArrayList<>();
    private MessageAction                   messageAction;
    private BlockingQueue<Message>          waitList        = new LinkedBlockingQueue<>();
    private int                             taskLimit       = Runtime.getRuntime().availableProcessors() * 2;
    private int                             runningTask     = 0;
    private ExecutorService                 threadPool      = null;
    private ExecutorCompletionService<Void> executorPool    = null;
    private Logger                          logger          = LogManager.getLogger();
    
    @Override
    public void run()
    {
        try
        {
            while (true)
            {
                Message msg = waitList.take();
                messageAction.addMessage(msg);
                executorPool.submit(messageAction);
                runningTask++;
                while (executorPool.poll() != null)
                {
                    runningTask--;
                }
                if (runningTask >= taskLimit)
                {
                    executorPool.take();
                    runningTask--;
                }
            }
        }
        catch (InterruptedException e)
        {
            logger.info("{}被中断", Thread.currentThread().getName());
        }
        catch (Exception e)
        {
            logger.error("{}异常", Thread.currentThread().getName(), e);
        }
    }
    
    public void offerMessage(Message message)
    {
        waitList.offer(message);
    }
    
    public void stop()
    {
        threadPool.shutdownNow();
        try
        {
            threadPool.awaitTermination(50, TimeUnit.SECONDS);
        }
        catch (InterruptedException e)
        {
            throw new RuntimeException(e);
        }
        finally
        {
            
        }
    }
    
    @Override
    public void init()
    {
        MessageInterceptor[] interceptors = interceptorList.toArray(new MessageInterceptor[0]);
        messageAction = new MessageAction(interceptors, handlerList.toArray(new MessageHandler[0]));
        threadPool = Executors.newFixedThreadPool(taskLimit, new ThreadFactory() {
            private int i = 1;
            
            @Override
            public Thread newThread(Runnable r)
            {
                Thread thread = new Thread(r, "业务处理线程-" + i);
                i++;
                return thread;
            }
        });
        executorPool = new ExecutorCompletionService<Void>(threadPool);
    }
    
    @Override
    public List<MessageHandler> getHandlerList()
    {
        return handlerList;
    }
    
    @Override
    public List<MessageInterceptor> getInterceptorList()
    {
        return interceptorList;
    }
    
}
